package com.atuguigu.flink.Day02.window;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
// 使用`AggregateFunction`来实现滚动窗口中温度最大和最小值
//累加器，增量。
//TODO 1 第一种方法实现，Tuple3实现（id,min,max）
public class Example3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

        stream.filter(
              f->f.id.equals("sensor_1")
        ).keyBy(
                new KeySelector<SendsorReading, String>() {
                    @Override
                    public String getKey(SendsorReading sendsorReading) throws Exception {
                        return sendsorReading.id;
                    }
                }
        ).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new MinMaxTemp())
                .print();


        env.execute();
    }

    public static  class MinMaxTemp implements AggregateFunction<SendsorReading, Tuple3<String,Double,Double>,Tuple3<String,Double,Double>> {

        @Override
        public Tuple3<String, Double, Double> createAccumulator() {
            return Tuple3.of("",Double.MIN_VALUE,Double.MAX_VALUE);
        }

        @Override
        public Tuple3<String, Double, Double> add(SendsorReading sendsorReading, Tuple3<String, Double, Double> stringDoubleDoubleTuple3) {
            return Tuple3.of(sendsorReading.id,Math.min(sendsorReading.temperture,stringDoubleDoubleTuple3.f1),Math.max(sendsorReading.temperture,stringDoubleDoubleTuple3.f2));
        }

        @Override
        public Tuple3<String, Double, Double> getResult(Tuple3<String, Double, Double> stringDoubleDoubleTuple3) {
            return stringDoubleDoubleTuple3;
        }

        @Override
        public Tuple3<String, Double, Double> merge(Tuple3<String, Double, Double> stringDoubleDoubleTuple3, Tuple3<String, Double, Double> acc1) {
            return null;
        }
    }
}
